package com.sensei.search.nodes;

import java.net.InetAddress;
import java.net.InetSocketAddress;

import org.apache.log4j.Logger;

import com.linkedin.norbert.cluster.javaapi.Cluster;
import com.linkedin.norbert.cluster.javaapi.ClusterConfig;
import com.linkedin.norbert.cluster.javaapi.DefaultCluster;
import com.linkedin.norbert.network.NetworkingException;
import com.linkedin.norbert.network.javaapi.MessageHandler;
import com.linkedin.norbert.network.javaapi.NetworkServer;
import com.linkedin.norbert.network.javaapi.ServerBootstrap;
import com.linkedin.norbert.network.javaapi.ServerConfig;

public class SenseiNode{
	private static Logger logger = Logger.getLogger(SenseiNode.class);
	
	private final String _zookeeperURL;
	private final int _id;
	private Cluster _cluster;
	private final SenseiNodeMessageHandler _msgHandler;
	private final int[] _partitions;
	private final String _clusterName;
	private ServerBootstrap _bootstrap;
	private NetworkServer _server;
	private final int _port;
	 
	
	public SenseiNode(String clusterName,int id,int port,int[] partitions,SenseiNodeMessageHandler msgHandler,String zookeeperURL){
		_id = id;
		_port = port;
		_partitions = partitions;
		_msgHandler = msgHandler;
		_zookeeperURL = zookeeperURL;
		_clusterName = clusterName;
		_cluster = null;
	}

	public void startup() throws Exception{
		ServerConfig serverConfig = new ServerConfig();
		serverConfig.setClusterName(_clusterName);
		serverConfig.setZooKeeperUrls(_zookeeperURL);
		serverConfig.setMessageHandlers(new MessageHandler[]{_msgHandler});
		
		serverConfig.setNodeId(_id);

		_bootstrap = new ServerBootstrap(serverConfig);
		
		Cluster cluster = _bootstrap.getCluster();
		try{
		  logger.info("waiting to connect to cluster...");
		  cluster.awaitConnection();
		  cluster.addNode(_id, new InetSocketAddress(InetAddress.getLocalHost(),_port),_partitions);
        }
        catch(Exception e){
          logger.warn(e.getMessage());
        }
	        
		_server = _bootstrap.getNetworkServer();
	    
	    try {
			logger.info("binding server ...");
	    	_server.bind();
			logger.info("started...");
	    } catch (NetworkingException e) {
	      logger.error("unable to bind to port: "+_port, e);
	      _server.shutdown();
	      _bootstrap.shutdown();
	    }
	    
	}
	
	public void shutdown() throws Exception{
		logger.info("shutting down...");
		try
		{
		  _server.shutdown();
		}
		catch(Exception e){
			logger.warn(e.getMessage());
		}
		finally{
		  try{
		    _bootstrap.shutdown();
		  }
		  finally{
			_cluster.removeNode(_id);
		  }
		}
	}
}
